-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak #18320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @felixcheung, @shivaram and @MLnick. |
|
Hmm I'm not sure - I'm pretty sure the session / spark context is already initialized when this test is run and changing the setting here does it affect the existing daemon process already running?
|
|
Yes, I believe you are correct and the daemon is already running but it avoids to use the problematic daemon - spark/core/src/main/scala/org/apache/spark/api/r/RRunner.scala Lines 363 to 392 in 478fbc8
|
|
Test build #78147 has finished for PR 18320 at commit
|
|
Test build #78148 has finished for PR 18320 at commit
|
|
thx - I think more importantly, does the issue manifest when someone manually call gapply in a similar way on RHEL/CentOS? We could workaround the test failure, but if user can use into this in normal use then we need to address this within gapply |
|
Yes, there is still the issue and this only fixes (avoid) the test failure. I believe running the codes should reproduce the issue for both Mac and CentOS. What I don't get is, when the number of fork executions is not many, this is not reproduced sometimes (even increasing pipes were not observed sometimes). The number of the pipes decrease in a certain condition (it did not look related with time but some events). The issue with Root cause is: With a terminal executing With another terminal: for(i in 0:200) {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
tools::pskill(Sys.getpid(), tools::SIGUSR1)
parallel:::mcexit(0L)
}
}The number of opened pipes just keep increasing. I double checked the processes and sockets are closed via We need to resolve this one. |
|
For normal usecases, I carefully suspect it might be fine because I executed 200 * ~10 tasks in a single machine quickly but I don't know if it happens frequently when it runs slowly in a cluster in a distributed manner. At least, this was not reproduced sometimes when the number of fork executions is not many. Practically, it might be fine but need more investigation if this is important to prioritize this issue. |
|
that's very interesting. that code has been around for 2 years - to be honest I'm not 100% sure about what it is doing. perhaps this is an existing leak with R when the socket is copied to a child fork. perhaps this could also be fixed with a lower number of partitions? again, I am actually not too concern about the test failure itself - this workaround seems fine, and we could even skip the test on RHEL. I am very concern with whether this is an ongoing memory leak that user can run into over time, and the fact that it fails here consistently in tests is only a factor on the number of operations performed so far and not in any way confined to gapply |
|
Yes, I guess it will pass if we reduce With
This looks being tested on OSs except for Windows. With
This looks being tested only on Windows. This PR proposes to switch this one to latter case (which was the former before) by avoiding calling the (already running from other execution in the tests) R daemon. I am fine with giving a shot with reducing the number of partitions if you are fond of it. |
|
I suspect this is an issue in R. I will raise this issue in R community soon and share it. |
|
@felixcheung, BTW, is it okay as a PR alone as is? +and actually if this can't be fixed within |
|
Does it fail by running just gapply and nothing else? I think this PR so only works around the problem. I am concern that an user can also run into this issue. An naive approach might be to change thought? |
|
I was thinking we should not work around for now if this can't be fixed within Let try to deal with |
|
also, perhaps this is a bug in RHEL/CentOS? it seems like mcfork only calls the OS fork function |
|
This also happens in Mac to me. My assumption is, |
|
It looks we should exit children in the parent. Previously, the child looks sending the kill signal for itself. Killing those in the parent looks resolving this problem properly. Before: for(i in 0:200) {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
tools::pskill(Sys.getpid(), tools::SIGUSR1)
parallel:::mcexit(0L)
}
}Roughly, after: for(i in 0:200) {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
parallel:::mcexit(0L)
}
exitChildren <- parallel:::selectChildren()
if (is.integer(exitChildren)) {
lapply(exitChildren, function(x) { tools::pskill(x, tools::SIGUSR1) })
}
} |
R/pkg/inst/worker/daemon.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I intended to terminates the children for every second or before launching other workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here can be tested as below:
With a terminal (terminal A) running the command as below:
nc -l 7777with another terminal (terminal B) running the codes below in R shell.
inputCon <- socketConnection(port = "7777", open = "rb", blocking = TRUE)
print(socketSelect(list(inputCon), timeout = 1))This prints FALSE as it does not have any data first.
In terminal A, if any character is typed first, this returns TRUE in terminal B with another execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Do we know what the old behavior was doing ? I'm thinking if we would spend more time in the loop below now and be less responsive to the inputCon. One thing -- Can we control the timeout based on whether there are any children running or not ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make my point a bit more clear, if there are no workers running, we can block on this select and dont need the one second timeout in that case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I got your point and was taking a look. I think probably feasible via parallel:::children. I will give a shot to deal with this here and be back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the old behaviour was simply to hang and wait there indefinitely.
https://stat.ethz.ch/R-manual/R-devel/library/base/html/socketSelect.html
numeric or NULL. Time in seconds to wait for a socket to become available; NULL means wait indefinitely.
R/pkg/inst/worker/daemon.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout for selectChildren looks 0 by default. So, this does not look hanging here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be tested as below:
vi tmp.Rcopy and paste
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
Sys.sleep(3L)
parallel:::mcexit(0L)
}
print(parallel:::selectChildren())
Sys.sleep(4L)
print(parallel:::selectChildren())and running Rscript tmp.R prints
[1] TRUE
[1] 2505For return values, please refer https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html.
For testing resource leak, this can be tested as below:
for(i in 0:100000) {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
parallel:::mcexit(0L)
}
exitChildren <- parallel:::selectChildren()
if (is.integer(exitChildren)) {
lapply(exitChildren, function(x) { tools::pskill(x, tools::SIGUSR1) })
}
}The original code below:
for(i in 0:100000) {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
tools::pskill(Sys.getpid(), tools::SIGUSR1)
parallel:::mcexit(0L)
}
}will probably complain in mcfork due to the lack of resources (e.g., ran out of file descriptors).
Actual number can be checked via watch -n 0.01 "lsof -c R | wc -l" (again, to check this more correctly, it should be -p PID instead of -c R after checking PID via ps -fe | grep /bin/exec/R) in another terminal.
|
I tested this on Mac.I will double check this and if this works on CentOS on Monday. What do you think about the current approach @felixcheung? |
|
Test build #78203 has finished for PR 18320 at commit
|
|
Test build #78204 has finished for PR 18320 at commit
|
|
Test build #78208 has finished for PR 18320 at commit
|
|
@felixcheung, I double checked that the current status solves the issue in both CentOS and Mac. |
R/pkg/inst/worker/daemon.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mcexit alone does not terminate the child.
With terminal A:
RWith terminal B:
watch -n 0.01 "ps -fe | grep /bin/exec/R"with terminal A:
for(i in 0:100) {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
parallel:::mcexit(0L)
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, up to my knowledge, this should work alone, at least in C (< I didn't test). Even if it was a bug in R and is fixed in the future, I guess the current logics would still work with the current status.
arbitrary <- 9999
tools::pskill(arbitrary, tools::SIGUSR1)https://stat.ethz.ch/R-manual/R-devel/library/tools/html/pskill.html
it silently ignores invalid values of its arguments, including zero or negative pids.
|
cc @shivaram too, could you take a look here too? I am very careful of this change and want to make sure. |
felixcheung
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very cool, thank you for spending time to investigate.
agreed since this is so core to everything we should be careful with this change.
R/pkg/inst/worker/daemon.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not do this under if (ready) { similar to before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes. It looks confusing. Probably, let me add more comments around here. I intended to do ...
A: When data (port number via inputCon) is arrived in daemon.R:
Before this change, each child terminated itself.
readyisTRUEand terminates children,worker.R, in the parent if they finished their works.- Launch other
worker.Rchildren.
B: When data (port number via inputCon) is not arrived in daemon.R:
It hung in this case before this change at socketSelect(...) and I introduced timeout = 1 to execute this case -
https://stat.ethz.ch/R-manual/R-devel/library/base/html/socketSelect.html
numeric or NULL. Time in seconds to wait for a socket to become available; NULL means wait indefinitely.
readyisFALSEand terminates children,worker.R, in the parent if they finished their works, every second.
If we do this within if (ready) {, the child processes will remain until A case happens again.
Forked children do some jobs and if the parent checks if any is finished right after forking by parallel:::selectChildren() will probably not return the finished children PIDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The related test was performed here - #18320 (comment) and #18320 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah got it.
could you add some comment in the code to document the behavior?
also I wonder if delay in connecting back to the JVM will now cause the code to abort prematurely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely for comments.
Maybe I missed your point. Children will only return their PID on exit and parallel:::selectChildren() will only return children PIDs that finished they work and called parallel:::mcexit(0L) (related test was done #18320 (comment)) up to my knowledge. So, even if connecting to JVM is delayed in worker.R and RRunner.scala, it won't matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@felixcheung, I tested with the change in worker.R below:
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
+Sys.sleep(5L)
inputCon <- socketConnection(
port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout)
outputCon <- socketConnection(It looks fine. Does this deal with your concern?
|
retest this please |
|
Test build #78517 has finished for PR 18320 at commit
|
|
Test build #78518 has finished for PR 18320 at commit
|
| }) | ||
| } else if (is.null(children)) { | ||
| # If it is NULL, there are no such children. Waits indefinitely for a socket connecion. | ||
| selectTimeout <- NULL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One path described above is:
- Every second if any socket connection is not available and if there are child workers running.
So after forking succeeded and we want to do check per second, but after one second and if no children sent data, we turn back immediately to wait indefinitely. Doesn't it mean we don't actually do the logic every second but just for the first second?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It becomes NULL if there are no children. It returns TRUE if there are children here and the timeout is reached. So, we always turn back to wait indefinitely only if there are no children at all.
https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html
TRUE is the timeout was reached, FALSE if an error occurred (e.g., if the master process was interrupted) or an integer vector of process IDs with children that have data available, or NULL if there are no children.
Would this answer your question (or did I maybe misunderstand) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It returns NULL if there are no children or there are no children that have data available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have this question because you have a comment:
Only the process IDs of children sent data to the parent are returned below.
So I am not sure if there are no children with data sent to the parent, the returned value is NULL or other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran several tests for this #18320 (comment) and #18320 (comment)
vi tmp.R
copy and paste
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
Sys.sleep(3L)
parallel:::mcexit(0L)
}
print("TRUE - the timeout was reached")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))and ...
Rscript tmp.R
prints ...
[1] "TRUE - the timeout was reached"
[1] "parallel:::selectChildren(timeout = 0):"
[1] TRUE
If data is not available, this should wait as much as timeout up to my knowledge rather than returning NULL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, cool. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If it is NULL, there are no such children. -> If it is NULL, there are no any children.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely.
|
Test build #78519 has finished for PR 18320 at commit
|
|
LGTM |
|
Test build #78523 has finished for PR 18320 at commit
|
felixcheung
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok LGTM again.
@shivaram there're fair bit of changes since your last LG, do you want to take a look?
|
Thanks for a close look. I am pretty confident of the current state now. |
|
LGTM. The update looks good. Thanks for the thorough testing. We could also post a note on the dev list about this change and especially ask people who use |
|
merged to master, thanks!! |
|
I think I am used to the symtoms here as I have tested many cases here. Let me try to organise and send a post to dev mailing list. |
|
Hmmmm. I carefully suspect this one, in particular, the last non-comment change causes unknown test failure with -10 error code occasionally If nothing happened to Jenkins. I believe -10 error starts around this time. |
|
Meanwhile, let me try to find the root cause. |
|
Let me leave a link #18456 for the related discussion. |
…daemon to prevent a leak ## What changes were proposed in this pull request? `mcfork` in R looks opening a pipe ahead but the existing logic does not properly close it when it is executed hot. This leads to the failure of more forking due to the limit for number of files open. This hot execution looks particularly for `gapply`/`gapplyCollect`. For unknown reason, this happens more easily in CentOS and could be reproduced in Mac too. All the details are described in https://issues.apache.org/jira/browse/SPARK-21093 This PR proposes simply to terminate R's worker processes in the parent of R's daemon to prevent a leak. ## How was this patch tested? I ran the codes below on both CentOS and Mac with that configuration disabled/enabled. ```r df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d")) collect(gapply(df, "a", function(key, x) { x }, schema(df))) collect(gapply(df, "a", function(key, x) { x }, schema(df))) ... # 30 times ``` Also, now it passes R tests on CentOS as below: ``` SparkSQL functions: Spark package found in SPARK_HOME: .../spark .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .................................................................................................................................... ``` Author: hyukjinkwon <[email protected]> Closes apache#18320 from HyukjinKwon/SPARK-21093.
…daemon to prevent a leak ## What changes were proposed in this pull request? `mcfork` in R looks opening a pipe ahead but the existing logic does not properly close it when it is executed hot. This leads to the failure of more forking due to the limit for number of files open. This hot execution looks particularly for `gapply`/`gapplyCollect`. For unknown reason, this happens more easily in CentOS and could be reproduced in Mac too. All the details are described in https://issues.apache.org/jira/browse/SPARK-21093 This PR proposes simply to terminate R's worker processes in the parent of R's daemon to prevent a leak. ## How was this patch tested? I ran the codes below on both CentOS and Mac with that configuration disabled/enabled. ```r df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d")) collect(gapply(df, "a", function(key, x) { x }, schema(df))) collect(gapply(df, "a", function(key, x) { x }, schema(df))) ... # 30 times ``` Also, now it passes R tests on CentOS as below: ``` SparkSQL functions: Spark package found in SPARK_HOME: .../spark .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .................................................................................................................................... ``` Author: hyukjinkwon <[email protected]> Closes apache#18320 from HyukjinKwon/SPARK-21093.
…daemon to prevent a leak ## What changes were proposed in this pull request? `mcfork` in R looks opening a pipe ahead but the existing logic does not properly close it when it is executed hot. This leads to the failure of more forking due to the limit for number of files open. This hot execution looks particularly for `gapply`/`gapplyCollect`. For unknown reason, this happens more easily in CentOS and could be reproduced in Mac too. All the details are described in https://issues.apache.org/jira/browse/SPARK-21093 This PR proposes simply to terminate R's worker processes in the parent of R's daemon to prevent a leak. ## How was this patch tested? I ran the codes below on both CentOS and Mac with that configuration disabled/enabled. ```r df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d")) collect(gapply(df, "a", function(key, x) { x }, schema(df))) collect(gapply(df, "a", function(key, x) { x }, schema(df))) ... # 30 times ``` Also, now it passes R tests on CentOS as below: ``` SparkSQL functions: Spark package found in SPARK_HOME: .../spark .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .................................................................................................................................... ``` Author: hyukjinkwon <[email protected]> Closes apache#18320 from HyukjinKwon/SPARK-21093.
…daemon to prevent a leak ## What changes were proposed in this pull request? This is a retry for #18320. This PR was reverted due to unexpected test failures with -10 error code. I was unable to reproduce in MacOS, CentOS and Ubuntu but only in Jenkins. So, the tests proceeded to verify this and revert the past try here - #18456 This new approach was tested in #18463. **Test results**: - With the part of suspicious change in the past try (466325d) Tests ran 4 times and 2 times passed and 2 time failed. - Without the part of suspicious change in the past try (466325d) Tests ran 5 times and they all passed. - With this new approach (0a7589c) Tests ran 5 times and they all passed. It looks the cause is as below (see 466325d): ```diff + exitCode <- 1 ... + data <- parallel:::readChild(child) + if (is.raw(data)) { + if (unserialize(data) == exitCode) { ... + } + } ... - parallel:::mcexit(0L) + parallel:::mcexit(0L, send = exitCode) ``` Two possibilities I think - `parallel:::mcexit(.. , send = exitCode)` https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html > It sends send to the master (unless NULL) and then shuts down the child process. However, it looks possible that the parent attemps to terminate the child right after getting our custom exit code. So, the child gets terminated between "send" and "shuts down", failing to exit properly. - A bug between `parallel:::mcexit(..., send = ...)` and `parallel:::readChild`. **Proposal**: To resolve this, I simply decided to avoid both possibilities with this new approach here (9ff89a7). To support this idea, I explained with some quotation of the documentation as below: https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html > `readChild` and `readChildren` return a raw vector with a "pid" attribute if data were available, an integer vector of length one with the process ID if a child terminated or `NULL` if the child no longer exists (no children at all for `readChildren`). `readChild` returns "an integer vector of length one with the process ID if a child terminated" so we can check if it is `integer` and the same selected "process ID". I believe this makes sure that the children are exited. In case that children happen to send any data manually to parent (which is why we introduced the suspicious part of the change (466325d)), this should be raw bytes and will be discarded (and then will try to read the next and check if it is `integer` in the next loop). ## How was this patch tested? Manual tests and Jenkins tests. Author: hyukjinkwon <[email protected]> Closes #18465 from HyukjinKwon/SPARK-21093-retry-1.
* [SPARK-18710][ML] Add offset in GLM ## What changes were proposed in this pull request? Add support for offset in GLM. This is useful for at least two reasons: 1. Account for exposure: e.g., when modeling the number of accidents, we may need to use miles driven as an offset to access factors on frequency. 2. Test incremental effects of new variables: we can use predictions from the existing model as offset and run a much smaller model on only new variables. This avoids re-estimating the large model with all variables (old + new) and can be very important for efficient large-scaled analysis. ## How was this patch tested? New test. yanboliang srowen felixcheung sethah Author: actuaryzhang <[email protected]> Closes #16699 from actuaryzhang/offset. * [SPARK-18294][CORE] Implement commit protocol to support `mapred` package's committer ## What changes were proposed in this pull request? This PR makes the following changes: - Implement a new commit protocol `HadoopMapRedCommitProtocol` which support the old `mapred` package's committer; - Refactor SparkHadoopWriter and SparkHadoopMapReduceWriter, now they are combined together, thus we can support write through both mapred and mapreduce API by the new SparkHadoopWriter, a lot of duplicated codes are removed. After this change, it should be pretty easy for us to support the committer from both the new and the old hadoop API at high level. ## How was this patch tested? No major behavior change, passed the existing test cases. Author: Xingbo Jiang <[email protected]> Closes #18438 from jiangxb1987/SparkHadoopWriter. * [ML] Fix scala-2.10 build failure of GeneralizedLinearRegressionSuite. ## What changes were proposed in this pull request? Fix scala-2.10 build failure of ```GeneralizedLinearRegressionSuite```. ## How was this patch tested? Build with scala-2.10. Author: Yanbo Liang <[email protected]> Closes #18489 from yanboliang/glr. * [SPARK-21223] Change fileToAppInfo in FsHistoryProvider to fix concurrent issue. # What issue does this PR address ? Jira:https://issues.apache.org/jira/browse/SPARK-21223 fix the Thread-safety issue in FsHistoryProvider Currently, Spark HistoryServer use a HashMap named fileToAppInfo in class FsHistoryProvider to store the map of eventlog path and attemptInfo. When use ThreadPool to Replay the log files in the list and merge the list of old applications with new ones, multi thread may update fileToAppInfo at the same time, which may cause Thread-safety issues, such as falling into an infinite loop because of calling resize func of the hashtable. Author: 曾林西 <[email protected]> Closes #18430 from zenglinxi0615/master. * [SPARK-21129][SQL] Arguments of SQL function call should not be named expressions ### What changes were proposed in this pull request? Function argument should not be named expressions. It could cause two issues: - Misleading error message - Unexpected query results when the column name is `distinct`, which is not a reserved word in our parser. ``` spark-sql> select count(distinct c1, distinct c2) from t1; Error in query: cannot resolve '`distinct`' given input columns: [c1, c2]; line 1 pos 26; 'Project [unresolvedalias('count(c1#30, 'distinct), None)] +- SubqueryAlias t1 +- CatalogRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#30, c2#31] ``` After the fix, the error message becomes ``` spark-sql> select count(distinct c1, distinct c2) from t1; Error in query: extraneous input 'c2' expecting {')', ',', '.', '[', 'OR', 'AND', 'IN', NOT, 'BETWEEN', 'LIKE', RLIKE, 'IS', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, '+', '-', '*', '/', '%', 'DIV', '&', '|', '||', '^'}(line 1, pos 35) == SQL == select count(distinct c1, distinct c2) from t1 -----------------------------------^^^ ``` ### How was this patch tested? Added a test case to parser suite. Author: Xiao Li <[email protected]> Author: gatorsmile <[email protected]> Closes #18338 from gatorsmile/parserDistinctAggFunc. * [SPARK-21052][SQL][FOLLOW-UP] Add hash map metrics to join ## What changes were proposed in this pull request? Remove `numHashCollisions` in `BytesToBytesMap`. And change `getAverageProbesPerLookup()` to `getAverageProbesPerLookup` as suggested. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh <[email protected]> Closes #18480 from viirya/SPARK-21052-followup. * [SPARK-17528][SQL] data should be copied properly before saving into InternalRow ## What changes were proposed in this pull request? For performance reasons, `UnsafeRow.getString`, `getStruct`, etc. return a "pointer" that points to a memory region of this unsafe row. This makes the unsafe projection a little dangerous, because all of its output rows share one instance. When we implement SQL operators, we should be careful to not cache the input rows because they may be produced by unsafe projection from child operator and thus its content may change overtime. However, when we updating values of InternalRow(e.g. in mutable projection and safe projection), we only copy UTF8String, we should also copy InternalRow, ArrayData and MapData. This PR fixes this, and also fixes the copy of vairous InternalRow, ArrayData and MapData implementations. ## How was this patch tested? new regression tests Author: Wenchen Fan <[email protected]> Closes #18483 from cloud-fan/fix-copy. * [SPARK-21127][SQL] Update statistics after data changing commands ## What changes were proposed in this pull request? Update stats after the following data changing commands: - InsertIntoHadoopFsRelationCommand - InsertIntoHiveTable - LoadDataCommand - TruncateTableCommand - AlterTableSetLocationCommand - AlterTableDropPartitionCommand ## How was this patch tested? Added new test cases. Author: wangzhenhua <[email protected]> Author: Zhenhua Wang <[email protected]> Closes #18334 from wzhfy/changeStatsForOperation. * [SPARK-21273][SQL] Propagate logical plan stats using visitor pattern and mixin ## What changes were proposed in this pull request? We currently implement statistics propagation directly in logical plan. Given we already have two different implementations, it'd make sense to actually decouple the two and add stats propagation using mixin. This would reduce the coupling between logical plan and statistics handling. This can also be a powerful pattern in the future to add additional properties (e.g. constraints). ## How was this patch tested? Should be covered by existing test cases. Author: Reynold Xin <[email protected]> Closes #18479 from rxin/stats-trait. * [SPARK-21275][ML] Update GLM test to use supportedFamilyNames ## What changes were proposed in this pull request? Update GLM test to use supportedFamilyNames as suggested here: https://github.com/apache/spark/pull/16699#discussion-diff-100574976R855 Author: actuaryzhang <[email protected]> Closes #18495 from actuaryzhang/mlGlmTest2. * [SPARK-18518][ML] HasSolver supports override ## What changes were proposed in this pull request? 1, make param support non-final with `finalFields` option 2, generate `HasSolver` with `finalFields = false` 3, override `solver` in LiR, GLR, and make MLPC inherit `HasSolver` ## How was this patch tested? existing tests Author: Ruifeng Zheng <[email protected]> Author: Zheng RuiFeng <[email protected]> Closes #16028 from zhengruifeng/param_non_final. * [SPARK-21170][CORE] Utils.tryWithSafeFinallyAndFailureCallbacks throws IllegalArgumentException: Self-suppression not permitted ## What changes were proposed in this pull request? Not adding the exception to the suppressed if it is the same instance as originalThrowable. ## How was this patch tested? Added new tests to verify this, these tests fail without source code changes and passes with the change. Author: Devaraj K <[email protected]> Closes #18384 from devaraj-kavali/SPARK-21170. * [SPARK-21260][SQL][MINOR] Remove the unused OutputFakerExec ## What changes were proposed in this pull request? OutputFakerExec was added long ago and is not used anywhere now so we should remove it. ## How was this patch tested? N/A Author: Xingbo Jiang <[email protected]> Closes #18473 from jiangxb1987/OutputFakerExec. * [SPARK-19852][PYSPARK][ML] Python StringIndexer supports 'keep' to handle invalid data ## What changes were proposed in this pull request? This PR is to maintain API parity with changes made in SPARK-17498 to support a new option 'keep' in StringIndexer to handle unseen labels or NULL values with PySpark. Note: This is updated version of #17237 , the primary author of this PR is VinceShieh . ## How was this patch tested? Unit tests. Author: VinceShieh <[email protected]> Author: Yanbo Liang <[email protected]> Closes #18453 from yanboliang/spark-19852. * [SPARK-18004][SQL] Make sure the date or timestamp related predicate can be pushed down to Oracle correctly ## What changes were proposed in this pull request? Move `compileValue` method in JDBCRDD to JdbcDialect, and override the `compileValue` method in OracleDialect to rewrite the Oracle-specific timestamp and date literals in where clause. ## How was this patch tested? An integration test has been added. Author: Rui Zha <[email protected]> Author: Zharui <[email protected]> Closes #18451 from SharpRay/extend-compileValue-to-dialects. * [SPARK-21250][WEB-UI] Add a url in the table of 'Running Executors' in worker page to visit job page. ## What changes were proposed in this pull request? Add a url in the table of 'Running Executors' in worker page to visit job page. When I click URL of 'Name', the current page jumps to the job page. Of course this is only in the table of 'Running Executors'. This URL of 'Name' is in the table of 'Finished Executors' does not exist, the click will not jump to any page. fix before:  fix after:  ## How was this patch tested? manual tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: guoxiaolong <[email protected]> Closes #18464 from guoxiaolongzte/SPARK-21250. * [SPARK-21137][CORE] Spark reads many small files slowly ## What changes were proposed in this pull request? Parallelize FileInputFormat.listStatus in Hadoop API via LIST_STATUS_NUM_THREADS to speed up examination of file sizes for wholeTextFiles et al ## How was this patch tested? Existing tests, which will exercise the key path here: using a local file system. Author: Sean Owen <[email protected]> Closes #18441 from srowen/SPARK-21137. * [TEST] Load test table based on case sensitivity ## What changes were proposed in this pull request? It is strange that we will get "table not found" error if **the first sql** uses upper case table names, when developers write tests with `TestHiveSingleton`, **although case insensitivity**. This is because in `TestHiveQueryExecution`, test tables are loaded based on exact matching instead of case sensitivity. ## How was this patch tested? Added a new test case. Author: Zhenhua Wang <[email protected]> Closes #18504 from wzhfy/testHive. * [SPARK-21102][SQL] Refresh command is too aggressive in parsing ### Idea This PR adds validation to REFRESH sql statements. Currently, users can specify whatever they want as resource path. For example, spark.sql("REFRESH ! $ !") will be executed without any exceptions. ### Implementation I am not sure that my current implementation is the most optimal, so any feedback is appreciated. My first idea was to make the grammar as strict as possible. Unfortunately, there were some problems. I tried the approach below: SqlBase.g4 ``` ... | REFRESH TABLE tableIdentifier #refreshTable | REFRESH resourcePath #refreshResource ... resourcePath : STRING | (IDENTIFIER | number | nonReserved | '/' | '-')+ // other symbols can be added if needed ; ``` It is not flexible enough and requires to explicitly mention all possible symbols. Therefore, I came up with the current approach that is implemented in the code. Let me know your opinion on which one is better. Author: aokolnychyi <[email protected]> Closes #18368 from aokolnychyi/spark-21102. * [SPARK-20073][SQL] Prints an explicit warning message in case of NULL-safe equals ## What changes were proposed in this pull request? This pr added code to print the same warning messages with `===` cases when using NULL-safe equals (`<=>`). ## How was this patch tested? Existing tests. Author: Takeshi Yamamuro <[email protected]> Closes #18436 from maropu/SPARK-20073. * [SPARK-21284][SQL] rename SessionCatalog.registerFunction parameter name ## What changes were proposed in this pull request? Looking at the code in `SessionCatalog.registerFunction`, the parameter `ignoreIfExists` is a wrong name. When `ignoreIfExists` is true, we will override the function if it already exists. So `overrideIfExists` should be the corrected name. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #18510 from cloud-fan/minor. * [TEST] Different behaviors of SparkContext Conf when building SparkSession ## What changes were proposed in this pull request? If the created ACTIVE sparkContext is not EXPLICITLY passed through the Builder's API `sparkContext()`, the conf of this sparkContext will also contain the conf set through the API `config()`; otherwise, the conf of this sparkContext will NOT contain the conf set through the API `config()` ## How was this patch tested? N/A Author: gatorsmile <[email protected]> Closes #18517 from gatorsmile/fixTestCase2. * [SPARK-21283][CORE] FileOutputStream should be created as append mode ## What changes were proposed in this pull request? `FileAppender` is used to write `stderr` and `stdout` files in `ExecutorRunner`, But before writing `ErrorStream` into the the `stderr` file, the header information has been written into ,if FileOutputStream is not created as append mode, the header information will be lost ## How was this patch tested? unit test case Author: liuxian <[email protected]> Closes #18507 from 10110346/wip-lx-0703. * [SPARK-21264][PYTHON] Call cross join path in join without 'on' and with 'how' ## What changes were proposed in this pull request? Currently, it throws a NPE when missing columns but join type is speicified in join at PySpark as below: ```python spark.conf.set("spark.sql.crossJoin.enabled", "false") spark.range(1).join(spark.range(1), how="inner").show() ``` ``` Traceback (most recent call last): ... py4j.protocol.Py4JJavaError: An error occurred while calling o66.join. : java.lang.NullPointerException at org.apache.spark.sql.Dataset.join(Dataset.scala:931) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ... ``` ```python spark.conf.set("spark.sql.crossJoin.enabled", "true") spark.range(1).join(spark.range(1), how="inner").show() ``` ``` ... py4j.protocol.Py4JJavaError: An error occurred while calling o84.join. : java.lang.NullPointerException at org.apache.spark.sql.Dataset.join(Dataset.scala:931) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ... ``` This PR suggests to follow Scala's one as below: ```scala scala> spark.conf.set("spark.sql.crossJoin.enabled", "false") scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show() ``` ``` org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans Range (0, 1, step=1, splits=Some(8)) and Range (0, 1, step=1, splits=Some(8)) Join condition is missing or trivial. Use the CROSS JOIN syntax to allow cartesian products between these relations.; ... ``` ```scala scala> spark.conf.set("spark.sql.crossJoin.enabled", "true") scala> spark.range(1).join(spark.range(1), Seq.empty[String], "inner").show() ``` ``` +---+---+ | id| id| +---+---+ | 0| 0| +---+---+ ``` **After** ```python spark.conf.set("spark.sql.crossJoin.enabled", "false") spark.range(1).join(spark.range(1), how="inner").show() ``` ``` Traceback (most recent call last): ... pyspark.sql.utils.AnalysisException: u'Detected cartesian product for INNER join between logical plans\nRange (0, 1, step=1, splits=Some(8))\nand\nRange (0, 1, step=1, splits=Some(8))\nJoin condition is missing or trivial.\nUse the CROSS JOIN syntax to allow cartesian products between these relations.;' ``` ```python spark.conf.set("spark.sql.crossJoin.enabled", "true") spark.range(1).join(spark.range(1), how="inner").show() ``` ``` +---+---+ | id| id| +---+---+ | 0| 0| +---+---+ ``` ## How was this patch tested? Added tests in `python/pyspark/sql/tests.py`. Author: hyukjinkwon <[email protected]> Closes #18484 from HyukjinKwon/SPARK-21264. * [MINOR] Add french stop word "les" ## What changes were proposed in this pull request? Added "les" as french stop word (plurial of le) Author: Thomas Decaux <[email protected]> Closes #18514 from ebuildy/patch-1. * [MINOR][SPARK SUBMIT] Print out R file usage in spark-submit ## What changes were proposed in this pull request? Currently, running the shell below: ```bash $ ./bin/spark-submit tmp.R a b c ``` with R file, `tmp.R` as below: ```r #!/usr/bin/env Rscript library(SparkR) sparkRSQL.init(sparkR.init(master = "local")) collect(createDataFrame(list(list(1)))) print(commandArgs(trailingOnly = TRUE)) ``` working fine as below: ```bash _1 1 1 [1] "a" "b" "c" ``` However, it looks not printed in usage documentation as below: ```bash $ ./bin/spark-submit ``` ``` Usage: spark-submit [options] <app jar | python file> [app arguments] ... ``` For `./bin/sparkR`, it looks fine as below: ```bash $ ./bin/sparkR tmp.R ``` ``` Running R applications through 'sparkR' is not supported as of Spark 2.0. Use ./bin/spark-submit <R file> ``` Running the script below: ```bash $ ./bin/spark-submit ``` **Before** ``` Usage: spark-submit [options] <app jar | python file> [app arguments] ... ``` **After** ``` Usage: spark-submit [options] <app jar | python file | R file> [app arguments] ... ``` ## How was this patch tested? Manually tested. Author: hyukjinkwon <[email protected]> Closes #18505 from HyukjinKwon/minor-doc-summit. * [SPARK-19507][SPARK-21296][PYTHON] Avoid per-record type dispatch in schema verification and improve exception message ## What changes were proposed in this pull request? **Context** While reviewing https://github.com/apache/spark/pull/17227, I realised here we type-dispatch per record. The PR itself is fine in terms of performance as is but this prints a prefix, `"obj"` in exception message as below: ``` from pyspark.sql.types import * schema = StructType([StructField('s', IntegerType(), nullable=False)]) spark.createDataFrame([["1"]], schema) ... TypeError: obj.s: IntegerType can not accept object '1' in type <type 'str'> ``` I suggested to get rid of this but during investigating this, I realised my approach might bring a performance regression as it is a hot path. Only for SPARK-19507 and https://github.com/apache/spark/pull/17227, It needs more changes to cleanly get rid of the prefix and I rather decided to fix both issues together. **Propersal** This PR tried to - get rid of per-record type dispatch as we do in many code paths in Scala so that it improves the performance (roughly ~25% improvement) - SPARK-21296 This was tested with a simple code `spark.createDataFrame(range(1000000), "int")`. However, I am quite sure the actual improvement in practice is larger than this, in particular, when the schema is complicated. - improve error message in exception describing field information as prose - SPARK-19507 ## How was this patch tested? Manually tested and unit tests were added in `python/pyspark/sql/tests.py`. Benchmark - codes: https://gist.github.com/HyukjinKwon/c3397469c56cb26c2d7dd521ed0bc5a3 Error message - codes: https://gist.github.com/HyukjinKwon/b1b2c7f65865444c4a8836435100e398 **Before** Benchmark: - Results: https://gist.github.com/HyukjinKwon/4a291dab45542106301a0c1abcdca924 Error message - Results: https://gist.github.com/HyukjinKwon/57b1916395794ce924faa32b14a3fe19 **After** Benchmark - Results: https://gist.github.com/HyukjinKwon/21496feecc4a920e50c4e455f836266e Error message - Results: https://gist.github.com/HyukjinKwon/7a494e4557fe32a652ce1236e504a395 Closes #17227 Author: hyukjinkwon <[email protected]> Author: David Gingrich <[email protected]> Closes #18521 from HyukjinKwon/python-type-dispatch. * [SPARK-21256][SQL] Add withSQLConf to Catalyst Test ### What changes were proposed in this pull request? SQLConf is moved to Catalyst. We are adding more and more test cases for verifying the conf-specific behaviors. It is nice to add a helper function to simplify the test cases. ### How was this patch tested? N/A Author: gatorsmile <[email protected]> Closes #18469 from gatorsmile/withSQLConf. * [SPARK-19726][SQL] Faild to insert null timestamp value to mysql using spark jdbc ## What changes were proposed in this pull request? when creating table like following: > create table timestamp_test(id int(11), time_stamp timestamp not null default current_timestamp); The result of Excuting "insert into timestamp_test values (111, null)" is different between Spark and JDBC. ``` mysql> select * from timestamp_test; +------+---------------------+ | id | time_stamp | +------+---------------------+ | 111 | 1970-01-01 00:00:00 | -> spark | 111 | 2017-06-27 19:32:38 | -> mysql +------+---------------------+ 2 rows in set (0.00 sec) ``` Because in such case ```StructField.nullable``` is false, so the generated codes of ```InvokeLike``` and ```BoundReference``` don't check whether the field is null or not. Instead, they directly use ```CodegenContext.INPUT_ROW.getLong(1)```, however, ```UnsafeRow.setNullAt(1)``` will put 0 in the underlying memory. The PR will ```always``` set ```StructField.nullable``` true after obtaining metadata from jdbc connection, Since we can insert null to not null timestamp column in MySQL. In this way, spark will propagate null to underlying DB engine, and let DB to choose how to process NULL. ## How was this patch tested? Added tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: YIHAODIAN\wangshuangshuang <[email protected]> Author: Shuangshuang Wang <[email protected]> Closes #18445 from shuangshuangwang/SPARK-19726. * [SPARK-20256][SQL] SessionState should be created more lazily ## What changes were proposed in this pull request? `SessionState` is designed to be created lazily. However, in reality, it created immediately in `SparkSession.Builder.getOrCreate` ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L943)). This PR aims to recover the lazy behavior by keeping the options into `initialSessionOptions`. The benefit is like the following. Users can start `spark-shell` and use RDD operations without any problems. **BEFORE** ```scala $ bin/spark-shell java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder' ... Caused by: org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.security.AccessControlException: Permission denied: user=spark, access=READ, inode="/apps/hive/warehouse":hive:hdfs:drwx------ ``` As reported in SPARK-20256, this happens when the warehouse directory is not allowed for this user. **AFTER** ```scala $ bin/spark-shell ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) Type in expressions to have them evaluated. Type :help for more information. scala> sc.range(0, 10, 1).count() res0: Long = 10 ``` ## How was this patch tested? Manual. This closes #18512 . Author: Dongjoon Hyun <[email protected]> Closes #18501 from dongjoon-hyun/SPARK-20256. * [SPARK-21268][MLLIB] Move center calculations to a distributed map in KMeans ## What changes were proposed in this pull request? The scal() and creation of newCenter vector is done in the driver, after a collectAsMap operation while it could be done in the distributed RDD. This PR moves this code before the collectAsMap for more efficiency ## How was this patch tested? This was tested manually by running the KMeansExample and verifying that the new code ran without error and gave same output as before. Author: dardelet <[email protected]> Author: Guillaume Dardelet <[email protected]> Closes #18491 from dardelet/move-center-calculation-to-distributed-map-kmean. * [SPARK-20889][SPARKR] Grouped documentation for WINDOW column methods ## What changes were proposed in this pull request? Grouped documentation for column window methods. Author: actuaryzhang <[email protected]> Closes #18481 from actuaryzhang/sparkRDocWindow. * [MINOR][SPARKR] ignore Rplots.pdf test output after running R tests ## What changes were proposed in this pull request? After running R tests in local build, it outputs Rplots.pdf. This one should be ignored in the git repository. Author: wangmiao1981 <[email protected]> Closes #18518 from wangmiao1981/ignore. * [SPARK-21295][SQL] Use qualified names in error message for missing references ### What changes were proposed in this pull request? It is strange to see the following error message. Actually, the column is from another table. ``` cannot resolve '`right.a`' given input columns: [a, c, d]; ``` After the PR, the error message looks like ``` cannot resolve '`right.a`' given input columns: [left.a, right.c, right.d]; ``` ### How was this patch tested? Added a test case Author: gatorsmile <[email protected]> Closes #18520 from gatorsmile/removeSQLConf. * [SPARK-21300][SQL] ExternalMapToCatalyst should null-check map key prior to converting to internal value. ## What changes were proposed in this pull request? `ExternalMapToCatalyst` should null-check map key prior to converting to internal value to throw an appropriate Exception instead of something like NPE. ## How was this patch tested? Added a test and existing tests. Author: Takuya UESHIN <[email protected]> Closes #18524 from ueshin/issues/SPARK-21300. * [SPARK-20889][SPARKR][FOLLOWUP] Clean up grouped doc for column methods ## What changes were proposed in this pull request? Add doc for methods that were left out, and fix various style and consistency issues. Author: actuaryzhang <[email protected]> Closes #18493 from actuaryzhang/sparkRDocCleanup. * [SPARK-21304][SQL] remove unnecessary isNull variable for collection related encoder expressions ## What changes were proposed in this pull request? For these collection-related encoder expressions, we don't need to create `isNull` variable if the loop element is not nullable. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #18529 from cloud-fan/minor. * [SPARK-18623][SQL] Add `returnNullable` to `StaticInvoke` and modify it to handle properly. ## What changes were proposed in this pull request? Add `returnNullable` to `StaticInvoke` the same as #15780 is trying to add to `Invoke` and modify to handle properly. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <[email protected]> Author: Takuya UESHIN <[email protected]> Closes #16056 from ueshin/issues/SPARK-18623. * [SPARK-21310][ML][PYSPARK] Expose offset in PySpark ## What changes were proposed in this pull request? Add offset to PySpark in GLM as in #16699. ## How was this patch tested? Python test Author: actuaryzhang <[email protected]> Closes #18534 from actuaryzhang/pythonOffset. * [SPARK-16167][SQL] RowEncoder should preserve array/map type nullability. ## What changes were proposed in this pull request? Currently `RowEncoder` doesn't preserve nullability of `ArrayType` or `MapType`. It returns always `containsNull = true` for `ArrayType`, `valueContainsNull = true` for `MapType` and also the nullability of itself is always `true`. This pr fixes the nullability of them. ## How was this patch tested? Add tests to check if `RowEncoder` preserves array/map nullability. Author: Takuya UESHIN <[email protected]> Author: Takuya UESHIN <[email protected]> Closes #13873 from ueshin/issues/SPARK-16167. * [SPARK-20383][SQL] Supporting Create [temporary] Function with the keyword 'OR REPLACE' and 'IF NOT EXISTS' ## What changes were proposed in this pull request? support to create [temporary] function with the keyword 'OR REPLACE' and 'IF NOT EXISTS' ## How was this patch tested? manual test and added test cases Please review http://spark.apache.org/contributing.html before opening a pull request. Author: ouyangxiaochen <[email protected]> Closes #17681 from ouyangxiaochen/spark-419. * [SPARK-21286][TEST] Modified StorageTabSuite unit test ## What changes were proposed in this pull request? The old unit test not effect ## How was this patch tested? unit test Author: he.qiao <[email protected]> Closes #18511 from Geek-He/dev_0703. * [SPARK-20858][DOC][MINOR] Document ListenerBus event queue size ## What changes were proposed in this pull request? This change adds a new configuration option `spark.scheduler.listenerbus.eventqueue.size` to the configuration docs to specify the capacity of the spark listener bus event queue. Default value is 10000. This is doc PR for [SPARK-15703](https://issues.apache.org/jira/browse/SPARK-15703). I added option to the `Scheduling` section, however it might be more related to `Spark UI` section. ## How was this patch tested? Manually verified correct rendering of configuration option. Author: sadikovi <[email protected]> Author: Ivan Sadikov <[email protected]> Closes #18476 from sadikovi/SPARK-20858. * [SPARK-19439][PYSPARK][SQL] PySpark's registerJavaFunction Should Support UDAFs ## What changes were proposed in this pull request? Support register Java UDAFs in PySpark so that user can use Java UDAF in PySpark. Besides that I also add api in `UDFRegistration` ## How was this patch tested? Unit test is added Author: Jeff Zhang <[email protected]> Closes #17222 from zjffdu/SPARK-19439. * [SPARK-21307][SQL] Remove SQLConf parameters from the parser-related classes. ### What changes were proposed in this pull request? This PR is to remove SQLConf parameters from the parser-related classes. ### How was this patch tested? The existing test cases. Author: gatorsmile <[email protected]> Closes #18531 from gatorsmile/rmSQLConfParser. * [SPARK-21278][PYSPARK] Upgrade to Py4J 0.10.6 ## What changes were proposed in this pull request? This PR aims to bump Py4J in order to fix the following float/double bug. Py4J 0.10.5 fixes this (https://github.com/bartdag/py4j/issues/272) and the latest Py4J is 0.10.6. **BEFORE** ``` >>> df = spark.range(1) >>> df.select(df['id'] + 17.133574204226083).show() +--------------------+ |(id + 17.1335742042)| +--------------------+ | 17.1335742042| +--------------------+ ``` **AFTER** ``` >>> df = spark.range(1) >>> df.select(df['id'] + 17.133574204226083).show() +-------------------------+ |(id + 17.133574204226083)| +-------------------------+ | 17.133574204226083| +-------------------------+ ``` ## How was this patch tested? Manual. Author: Dongjoon Hyun <[email protected]> Closes #18546 from dongjoon-hyun/SPARK-21278. * [SPARK-21248][SS] The clean up codes in StreamExecution should not be interrupted ## What changes were proposed in this pull request? This PR uses `runUninterruptibly` to avoid that the clean up codes in StreamExecution is interrupted. It also removes an optimization in `runUninterruptibly` to make sure this method never throw `InterruptedException`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #18461 from zsxwing/SPARK-21248. * [SPARK-21308][SQL] Remove SQLConf parameters from the optimizer ### What changes were proposed in this pull request? This PR removes SQLConf parameters from the optimizer rules ### How was this patch tested? The existing test cases Author: gatorsmile <[email protected]> Closes #18533 from gatorsmile/rmSQLConfOptimizer. * [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream ## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh Wale <[email protected]> Closes #18535 from sumwale/SPARK-21312. * [SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir should be deleted ## What changes were proposed in this pull request? Stopping query while it is being initialized can throw interrupt exception, in which case temporary checkpoint directories will not be deleted, and the test will fail. Author: Tathagata Das <[email protected]> Closes #18442 from tdas/DatastreamReaderWriterSuite-fix. * [SPARK-21012][SUBMIT] Add glob support for resources adding to Spark Current "--jars (spark.jars)", "--files (spark.files)", "--py-files (spark.submit.pyFiles)" and "--archives (spark.yarn.dist.archives)" only support non-glob path. This is OK for most of the cases, but when user requires to add more jars, files into Spark, it is too verbose to list one by one. So here propose to add glob path support for resources. Also improving the code of downloading resources. ## How was this patch tested? UT added, also verified manually in local cluster. Author: jerryshao <[email protected]> Closes #18235 from jerryshao/SPARK-21012. * [SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations ## What changes were proposed in this pull request? Right now in the UI, after SPARK-20213, we can show the operations to write data out. However, there is no way to associate metrics with data writes. We should show relative metrics on the operations. #### Supported commands This change supports updating metrics for file-based data writing operations, including `InsertIntoHadoopFsRelationCommand`, `InsertIntoHiveTable`. Supported metrics: * number of written files * number of dynamic partitions * total bytes of written data * total number of output rows * average writing data out time (ms) * (TODO) min/med/max number of output rows per file/partition * (TODO) min/med/max bytes of written data per file/partition #### Commands not supported `InsertIntoDataSourceCommand`, `SaveIntoDataSourceCommand`: The two commands uses DataSource APIs to write data out, i.e., the logic of writing data out is delegated to the DataSource implementations, such as `InsertableRelation.insert` and `CreatableRelationProvider.createRelation`. So we can't obtain metrics from delegated methods for now. `CreateHiveTableAsSelectCommand`, `CreateDataSourceTableAsSelectCommand` : The two commands invokes other commands to write data out. The invoked commands can even write to non file-based data source. We leave them as future TODO. #### How to update metrics of writing files out A `RunnableCommand` which wants to update metrics, needs to override its `metrics` and provide the metrics data structure to `ExecutedCommandExec`. The metrics are prepared during the execution of `FileFormatWriter`. The callback function passed to `FileFormatWriter` will accept the metrics and update accordingly. There is a metrics updating function in `RunnableCommand`. In runtime, the function will be bound to the spark context and `metrics` of `ExecutedCommandExec` and pass to `FileFormatWriter`. ## How was this patch tested? Updated unit tests. Author: Liang-Chi Hsieh <[email protected]> Closes #18159 from viirya/SPARK-20703-2. * [SPARK-21324][TEST] Improve statistics test suites ## What changes were proposed in this pull request? 1. move `StatisticsCollectionTestBase` to a separate file. 2. move some test cases to `StatisticsCollectionSuite` so that `hive/StatisticsSuite` only keeps tests that need hive support. 3. clear up some test cases. ## How was this patch tested? Existing tests. Author: wangzhenhua <[email protected]> Author: Zhenhua Wang <[email protected]> Closes #18545 from wzhfy/cleanStatSuites. * [SPARK-21273][SQL][FOLLOW-UP] Add missing test cases back and revise code style ## What changes were proposed in this pull request? Add missing test cases back and revise code style Follow up the previous PR: https://github.com/apache/spark/pull/18479 ## How was this patch tested? Unit test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang Gengliang <[email protected]> Closes #18548 from gengliangwang/stat_propagation_revise. * [SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded before ## What changes were proposed in this pull request? This PR Improvement in two: 1.With spark.shuffle.spill.diskWriteBufferSize configure diskWriteBufferSize of ShuffleExternalSorter. when change the size of the diskWriteBufferSize to test `forceSorterToSpill` The average performance of running 10 times is as follows:(their unit is MS). ``` diskWriteBufferSize: 1M 512K 256K 128K 64K 32K 16K 8K 4K --------------------------------------------------------------------------------------- RecordSize = 2.5M 742 722 694 686 667 668 671 669 683 RecordSize = 1M 294 293 292 287 283 285 281 279 285 ``` 2.Remove outputBufferSizeInBytes and inputBufferSizeInBytes to initialize in mergeSpillsWithFileStream function. ## How was this patch tested? The unit test. Author: caoxuewen <[email protected]> Closes #18174 from heary-cao/buffersize. * [SPARK-21228][SQL] InSet incorrect handling of structs ## What changes were proposed in this pull request? When data type is struct, InSet now uses TypeUtils.getInterpretedOrdering (similar to EqualTo) to build a TreeSet. In other cases it will use a HashSet as before (which should be faster). Similarly, In.eval uses Ordering.equiv instead of equals. ## How was this patch tested? New test in SQLQuerySuite. Author: Bogdan Raducanu <[email protected]> Closes #18455 from bogdanrdc/SPARK-21228. * [SPARK-21204][SQL] Add support for Scala Set collection types in serialization ## What changes were proposed in this pull request? Currently we can't produce a `Dataset` containing `Set` in SparkSQL. This PR tries to support serialization/deserialization of `Set`. Because there's no corresponding internal data type in SparkSQL for a `Set`, the most proper choice for serializing a set should be an array. ## How was this patch tested? Added unit tests. Author: Liang-Chi Hsieh <[email protected]> Closes #18416 from viirya/SPARK-21204. * [SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval ## What changes were proposed in this pull request? Rename org.apache.spark.sql.catalyst.plans.logical.statsEstimation.Range to ValueInterval. The current naming is identical to logical operator "range". Refactoring it to ValueInterval is more accurate. ## How was this patch tested? unit test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang Gengliang <[email protected]> Closes #18549 from gengliangwang/ValueInterval. * [SPARK-21267][SS][DOCS] Update Structured Streaming Documentation ## What changes were proposed in this pull request? Few changes to the Structured Streaming documentation - Clarify that the entire stream input table is not materialized - Add information for Ganglia - Add Kafka Sink to the main docs - Removed a couple of leftover experimental tags - Added more associated reading material and talk videos. In addition, https://github.com/apache/spark/pull/16856 broke the link to the RDD programming guide in several places while renaming the page. This PR fixes those sameeragarwal cloud-fan. - Added a redirection to avoid breaking internal and possible external links. - Removed unnecessary redirection pages that were there since the separate scala, java, and python programming guides were merged together in 2013 or 2014. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tathagata Das <[email protected]> Closes #18485 from tdas/SPARK-21267. * [SPARK-20946][SQL] Do not update conf for existing SparkContext in SparkSession.getOrCreate ## What changes were proposed in this pull request? SparkContext is shared by all sessions, we should not update its conf for only one session. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #18536 from cloud-fan/config. * [SPARK-21329][SS] Make EventTimeWatermarkExec explicitly UnaryExecNode ## What changes were proposed in this pull request? Making EventTimeWatermarkExec explicitly UnaryExecNode /cc tdas zsxwing ## How was this patch tested? Local build. Author: Jacek Laskowski <[email protected]> Closes #18509 from jaceklaskowski/EventTimeWatermarkExec-UnaryExecNode. * [SPARK-21326][SPARK-21066][ML] Use TextFileFormat in LibSVMFileFormat and allow multiple input paths for determining numFeatures ## What changes were proposed in this pull request? This is related with [SPARK-19918](https://issues.apache.org/jira/browse/SPARK-19918) and [SPARK-18362](https://issues.apache.org/jira/browse/SPARK-18362). This PR proposes to use `TextFileFormat` and allow multiple input paths (but with a warning) when determining the number of features in LibSVM data source via an extra scan. There are three points here: - The main advantage of this change should be to remove file-listing bottlenecks in driver side. - Another advantage is ones from using `FileScanRDD`. For example, I guess we can use `spark.sql.files.ignoreCorruptFiles` option when determining the number of features. - We can unify the schema inference code path in text based data sources. This is also a preparation for [SPARK-21289](https://issues.apache.org/jira/browse/SPARK-21289). ## How was this patch tested? Unit tests in `LibSVMRelationSuite`. Closes #18288 Author: hyukjinkwon <[email protected]> Closes #18556 from HyukjinKwon/libsvm-schema. * [SPARK-21327][SQL][PYSPARK] ArrayConstructor should handle an array of typecode 'l' as long rather than int in Python 2. ## What changes were proposed in this pull request? Currently `ArrayConstructor` handles an array of typecode `'l'` as `int` when converting Python object in Python 2 into Java object, so if the value is larger than `Integer.MAX_VALUE` or smaller than `Integer.MIN_VALUE` then the overflow occurs. ```python import array data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))] df = spark.createDataFrame(data) df.show(truncate=False) ``` ``` +----------+ |longarray | +----------+ |[0, 0, -1]| +----------+ ``` This should be: ``` +----------------------------------------------+ |longarray | +----------------------------------------------+ |[-9223372036854775808, 0, 9223372036854775807]| +----------------------------------------------+ ``` ## How was this patch tested? Added a test and existing tests. Author: Takuya UESHIN <[email protected]> Closes #18553 from ueshin/issues/SPARK-21327. * [SPARK-21217][SQL] Support ColumnVector.Array.to<type>Array() ## What changes were proposed in this pull request? This PR implements bulk-copy for `ColumnVector.Array.to<type>Array()` methods (e.g. `toIntArray()`) in `ColumnVector.Array` by using `System.arrayCopy()` or `Platform.copyMemory()`. Before this PR, when one of these method is called, the generic method in `ArrayData` is called. It is not fast since element-wise copy is performed. This PR can improve performance of a benchmark program by 1.9x and 3.2x. Without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Int Array Best/Avg Time(ms) Rate(M/s) Per Row(ns) ------------------------------------------------------------------------------------------------ ON_HEAP 586 / 628 14.3 69.9 OFF_HEAP 893 / 902 9.4 106.5 ``` With this PR ``` OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Int Array Best/Avg Time(ms) Rate(M/s) Per Row(ns) ------------------------------------------------------------------------------------------------ ON_HEAP 306 / 331 27.4 36.4 OFF_HEAP 282 / 287 29.8 33.6 ``` Source program ``` (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val len = 8 * 1024 * 1024 val column = ColumnVector.allocate(len * 2, new ArrayType(IntegerType, false), memMode) val data = column.arrayData var i = 0 while (i < len) { data.putInt(i, i) i += 1 } column.putArray(0, 0, len) val benchmark = new Benchmark("Int Array", len, minNumIters = 20) benchmark.addCase(s"$memMode") { iter => var i = 0 while (i < 50) { column.getArray(0).toIntArray i += 1 } } benchmark.run }} ``` ## How was this patch tested? Added test suite Author: Kazuaki Ishizaki <[email protected]> Closes #18425 from kiszk/SPARK-21217. * [SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations ## What changes were proposed in this pull request? Remove time metrics since it seems no way to measure it in non per-row tracking. ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18558 from viirya/SPARK-20703-followup. * [SPARK-21313][SS] ConsoleSink's string representation ## What changes were proposed in this pull request? Add `toString` with options for `ConsoleSink` so it shows nicely in query progress. **BEFORE** ``` "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink4b340441" } ``` **AFTER** ``` "sink" : { "description" : "ConsoleSink[numRows=10, truncate=false]" } ``` /cc zsxwing tdas ## How was this patch tested? Local build Author: Jacek Laskowski <[email protected]> Closes #18539 from jaceklaskowski/SPARK-21313-ConsoleSink-toString. * [SPARK-21285][ML] VectorAssembler reports the column name of unsupported data type ## What changes were proposed in this pull request? add the column name in the exception which is raised by unsupported data type. ## How was this patch tested? + [x] pass all tests. Author: Yan Facai (颜发才) <[email protected]> Closes #18523 from facaiy/ENH/vectorassembler_add_col. * [SPARK-21335][SQL] support un-aliased subquery ## What changes were proposed in this pull request? un-aliased subquery is supported by Spark SQL for a long time. Its semantic was not well defined and had confusing behaviors, and it's not a standard SQL syntax, so we disallowed it in https://issues.apache.org/jira/browse/SPARK-20690 . However, this is a breaking change, and we do have existing queries using un-aliased subquery. We should add the support back and fix its semantic. This PR fixes the un-aliased subquery by assigning a default alias name. After this PR, there is no syntax change from branch 2.2 to master, but we invalid a weird use case: `SELECT v.i from (SELECT i FROM v)`. Now this query will throw analysis exception because users should not be able to use the qualifier inside a subquery. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes #18559 from cloud-fan/sub-query. * [SPARK-19358][CORE] LiveListenerBus shall log the event name when dropping them due to a fully filled queue ## What changes were proposed in this pull request? Some dropped event will make the whole application behaves unexpectedly, e.g. some UI problem...we shall log the dropped event name to facilitate the debugging ## How was this patch tested? Existing tests Author: CodingCat <[email protected]> Closes #16697 from CodingCat/SPARK-19358. * [SPARK-21336] Revise rand comparison in BatchEvalPythonExecSuite ## What changes were proposed in this pull request? Revise rand comparison in BatchEvalPythonExecSuite In BatchEvalPythonExecSuite, there are two cases using the case "rand() > 3" Rand() generates a random value in [0, 1), it is wired to be compared with 3, use 0.3 instead ## How was this patch tested? unit test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang Gengliang <[email protected]> Closes #18560 from gengliangwang/revise_BatchEvalPythonExecSuite. * [SPARK-21100][SQL] Add summary method as alternative to describe that gives quartiles similar to Pandas ## What changes were proposed in this pull request? Adds method `summary` that allows user to specify which statistics and percentiles to calculate. By default it include the existing statistics from `describe` and quartiles (25th, 50th, and 75th percentiles) similar to Pandas. Also changes the implementation of `describe` to delegate to `summary`. ## How was this patch tested? additional unit test Author: Andrew Ray <[email protected]> Closes #18307 from aray/SPARK-21100. * [SPARK-21281][SQL] Use string types by default if array and map have no argument ## What changes were proposed in this pull request? This pr modified code to use string types by default if `array` and `map` in functions have no argument. This behaviour is the same with Hive one; ``` hive> CREATE TEMPORARY TABLE t1 AS SELECT map(); hive> DESCRIBE t1; _c0 map<string,string> hive> CREATE TEMPORARY TABLE t2 AS SELECT array(); hive> DESCRIBE t2; _c0 array<string> ``` ## How was this patch tested? Added tests in `DataFrameFunctionsSuite`. Author: Takeshi Yamamuro <[email protected]> Closes #18516 from maropu/SPARK-21281. * [SPARK-20379][CORE] Allow SSL config to reference env variables. This change exposes the internal code path in SparkConf that allows configs to be read with variable substitution applied, and uses that new method in SSLOptions so that SSL configs can reference other variables, and more importantly, environment variables, providing a secure way to provide passwords to Spark when using SSL. The approach is a little bit hacky, but is the smallest change possible. Otherwise, the concept of "namespaced configs" would have to be added to the config system, which would create a lot of noise for not much gain at this point. Tested with added unit tests, and on a real cluster with SSL enabled. Author: Marcelo Vanzin <[email protected]> Closes #18394 from vanzin/SPARK-20379.try2. * [SPARK-21069][SS][DOCS] Add rate source to programming guide. ## What changes were proposed in this pull request? SPARK-20979 added a new structured streaming source: Rate source. This patch adds the corresponding documentation to programming guide. ## How was this patch tested? Tested by running jekyll locally. Author: Prashant Sharma <[email protected]> Author: Prashant Sharma <[email protected]> Closes #18562 from ScrapCodes/spark-21069/rate-source-docs. * [SPARK-20307][SPARKR] SparkR: pass on setHandleInvalid to spark.mllib functions that use StringIndexer ## What changes were proposed in this pull request? For randomForest classifier, if test data contains unseen labels, it will throw an error. The StringIndexer already has the handleInvalid logic. The patch add a new method to set the underlying StringIndexer handleInvalid logic. This patch should also apply to other classifiers. This PR focuses on the main logic and randomForest classifier. I will do follow-up PR for other classifiers. ## How was this patch tested? Add a new unit test based on the error case in the JIRA. Author: wangmiao1981 <[email protected]> Closes #18496 from wangmiao1981/handle. * [SPARK-20456][DOCS] Add examples for functions collection for pyspark ## What changes were proposed in this pull request? This adds documentation to many functions in pyspark.sql.functions.py: `upper`, `lower`, `reverse`, `unix_timestamp`, `from_unixtime`, `rand`, `randn`, `collect_list`, `collect_set`, `lit` Add units to the trigonometry functions. Renames columns in datetime examples to be more informative. Adds links between some functions. ## How was this patch tested? `./dev/lint-python` `python python/pyspark/sql/functions.py` `./python/run-tests.py --module pyspark-sql` Author: Michael Patterson <[email protected]> Closes #17865 from map222/spark-20456. * Mesos doc fixes ## What changes were proposed in this pull request? Some link fixes for the documentation [Running Spark on Mesos](https://spark.apache.org/docs/latest/running-on-mesos.html): * Updated Link to Mesos Frameworks (Projects built on top of Mesos) * Update Link to Mesos binaries from Mesosphere (former link was redirected to dcos install page) ## How was this patch tested? Documentation was built and changed page manually/visually inspected. No code was changed, hence no dev tests. Since these changes are rather trivial I did not open a new JIRA ticket. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Joachim Hereth <[email protected]> Closes #18564 from daten-kieker/mesos_doc_fixes. * [SPARK-20609][MLLIB][TEST] manually cleared 'spark.local.dir' before/after a test in ALSCleanerSuite ## What changes were proposed in this pull request? This PR is similar to #17869. Once` 'spark.local.dir'` is set. Unless this is manually cleared before/after a test. it could return the same directory even if this property is configured. and add before/after for each likewise in ALSCleanerSuite. ## How was this patch tested? existing test. Author: caoxuewen <[email protected]> Closes #18537 from heary-cao/ALSCleanerSuite. * [SPARK-21345][SQL][TEST][TEST-MAVEN] SparkSessionBuilderSuite should clean up stopped sessions. ## What changes were proposed in this pull request? `SparkSessionBuilderSuite` should clean up stopped sessions. Otherwise, it leaves behind some stopped `SparkContext`s interfereing with other test suites using `ShardSQLContext`. Recently, master branch fails consequtively. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/ ## How was this patch tested? Pass the Jenkins with a updated suite. Author: Dongjoon Hyun <[email protected]> Closes #18567 from dongjoon-hyun/SPARK-SESSION. * [SPARK-21083][SQL] Store zero size and row count when analyzing empty table ## What changes were proposed in this pull request? We should be able to store zero size and row count after analyzing empty table. This pr also enhances the test cases for re-analyzing tables. ## How was this patch tested? Added a new test case and enhanced some test cases. Author: Zhenhua Wang <[email protected]> Closes #18292 from wzhfy/analyzeNewColumn. * [SPARK-20342][CORE] Update task accumulators before sending task end event. This makes sures that listeners get updated task information; otherwise it's possible to write incomplete task information into event logs, for example, making the information in a replayed UI inconsistent with the original application. Added a new unit test to try to detect the problem, but it's not guaranteed to fail since it's a race; but it fails pretty reliably for me without the scheduler changes. Author: Marcelo Vanzin <[email protected]> Closes #18393 from vanzin/SPARK-20342.try2. * [SPARK-21343] Refine the document for spark.reducer.maxReqSizeShuffleToMem. ## What changes were proposed in this pull request? In current code, reducer can break the old shuffle service when `spark.reducer.maxReqSizeShuffleToMem` is enabled. Let's refine document. Author: jinxing <[email protected]> Closes #18566 from jinxing64/SPARK-21343. * [SPARK-21307][REVERT][SQL] Remove SQLConf parameters from the parser-related classes ## What changes were proposed in this pull request? Since we do not set active sessions when parsing the plan, we are unable to correctly use SQLConf.get to find the correct active session. Since https://github.com/apache/spark/pull/18531 breaks the build, I plan to revert it at first. ## How was this patch tested? The existing test cases Author: Xiao Li <[email protected]> Closes #18568 from gatorsmile/revert18531. * [SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak ## What changes were proposed in this pull request? This is a retry for #18320. This PR was reverted due to unexpected test failures with -10 error code. I was unable to reproduce in MacOS, CentOS and Ubuntu but only in Jenkins. So, the tests proceeded to verify this and revert the past try here - https://github.com/apache/spark/pull/18456 This new approach was tested in https://github.com/apache/spark/pull/18463. **Test results**: - With the part of suspicious change in the past try (https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189) Tests ran 4 times and 2 times passed and 2 time failed. - Without the part of suspicious change in the past try (https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189) Tests ran 5 times and they all passed. - With this new approach (https://github.com/apache/spark/pull/18463/commits/0a7589c09f53dfc2094497d8d3e59d6407569417) Tests ran 5 times and they all passed. It looks the cause is as below (see https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189): ```diff + exitCode <- 1 ... + data <- parallel:::readChild(child) + if (is.raw(data)) { + if (unserialize(data) == exitCode) { ... + } + } ... - parallel:::mcexit(0L) + parallel:::mcexit(0L, send = exitCode) ``` Two possibilities I think - `parallel:::mcexit(.. , send = exitCode)` https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html > It sends send to the master (unless NULL) and then shuts down the child process. However, it looks possible that the parent attemps to terminate the child right after getting our custom exit code. So, the child gets terminated between "send" and "shuts down", failing to exit properly. - A bug between `parallel:::mcexit(..., send = ...)` and `parallel:::readChild`. **Proposal**: To resolve this, I simply decided to avoid both possibilities with this new approach here (https://github.com/apache/spark/pull/18465/commits/9ff89a7859cb9f427fc774f33c3521c7d962b723). To support this idea, I explained with some quotation of the documentation as below: https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html > `readChild` and `readChildren` return a raw vector with a "pid" attribute if data were available, an integer vector of length one with the process ID if a child terminated or `NULL` if the child no longer exists (no children at all for `readChildren`). `readChild` returns "an integer vector of length one with the process ID if a child terminated" so we can check if it is `integer` and the same selected "process ID". I believe this makes sure that the children are exited. In case that children happen to send any data manually to parent (which is why we introduced the suspicious part of the change (https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189)), this should be raw bytes and will be discarded (and then will try to read the next and check if it is `integer` in the next loop). ## How was this patch tested? Manual tests and Jenkins tests. Author: hyukjinkwon <[email protected]> Closes #18465 from HyukjinKwon/SPARK-21093-retry-1. * [SPARK-18016][SQL][FOLLOWUP] merge declareAddedFunctions, initNestedClasses and declareNestedClasses ## What changes were proposed in this pull request? These 3 methods have to be used together, so it makes more sense to merge them into one method and then the caller side only need to call one method. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #18579 from cloud-fan/minor. * [MINOR][DOC] Improve the docs about how to correctly set configurations ## What changes were proposed in this pull request? Spark provides several ways to set configurations, either from configuration file, or from `spark-submit` command line options, or programmatically through `SparkConf` class. It may confuses beginners why some configurations set through `SparkConf` cannot take affect. So here add some docs to address this problems and let beginners know how to correctly set configurations. ## How was this patch tested? N/A Author: jerryshao <[email protected]> Closes #18552 from jerryshao/improve-doc. * [SPARK-21100][SQL][FOLLOWUP] cleanup code and add more comments for Dataset.summary ## What changes were proposed in this pull request? Some code cleanup and adding comments to make the code more readable. Changed the way to generate result rows, to be more clear. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #18570 from cloud-fan/summary. * [SPARK-21219][CORE] Task retry occurs on same executor due to race condition with blacklisting ## What changes were proposed in this pull request? There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor. This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure). Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed. There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219 The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in T…
What changes were proposed in this pull request?
mcforkin R looks opening a pipe ahead but the existing logic does not properly close it when it is executed hot. This leads to the failure of more forking due to the limit for number of files open.This hot execution looks particularly for
gapply/gapplyCollect. For unknown reason, this happens more easily in CentOS and could be reproduced in Mac too.All the details are described in https://issues.apache.org/jira/browse/SPARK-21093
This PR proposes simply to terminate R's worker processes in the parent of R's daemon to prevent a leak.
How was this patch tested?
I ran the codes below on both CentOS and Mac with that configuration disabled/enabled.
Also, now it passes R tests on CentOS as below: