-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in PySpark as action for a query executor listener #21060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in PySpark as action for a query executor listener #21060
Conversation
…uery executor listener
This PR proposes to add `collect` to a query executor as an action.
Seems `collect` / `collect` with Arrow are not recognised via `QueryExecutionListener` as an action. For example, if we have a custom listener as below:
```scala
package org.apache.spark.sql
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
class TestQueryExecutionListener extends QueryExecutionListener with Logging {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
logError("Look at me! I'm 'onSuccess'")
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { }
}
```
and set `spark.sql.queryExecutionListeners` to `org.apache.spark.sql.TestQueryExecutionListener`
Other operations in PySpark or Scala side seems fine:
```python
>>> sql("SELECT * FROM range(1)").show()
```
```
18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
+---+
| id|
+---+
| 0|
+---+
```
```scala
scala> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
res1: Array[org.apache.spark.sql.Row] = Array([0])
```
but ..
**Before**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
id
0 0
```
**After**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
id
0 0
```
I have manually tested as described above and unit test was added.
Author: hyukjinkwon <[email protected]>
Closes apache#21007 from HyukjinKwon/SPARK-23942.
(cherry picked from commit ab7b961)
Signed-off-by: hyukjinkwon <[email protected]>
|
cc @BryanCutler |
| ReusedPySparkTestCase.tearDownClass() | ||
| cls.spark.stop() | ||
|
|
||
| def assertPandasEqual(self, expected, result): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method causes a conflict which I don't really understand why. I compared line by line, character by character and they look identical.
|
Test build #89312 has finished for PR 21060 at commit
|
|
retest this please |
|
Test build #89327 has finished for PR 21060 at commit
|
|
retest this please |
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending Jenkins
|
Test build #89352 has finished for PR 21060 at commit
|
|
retest this please |
|
Test build #89363 has finished for PR 21060 at commit
|
|
Merged to branch-2.3. Thanks for reviewing this @BryanCutler. |
…tion for a query executor listener
## What changes were proposed in this pull request?
This PR proposes to add `collect` to a query executor as an action.
Seems `collect` / `collect` with Arrow are not recognised via `QueryExecutionListener` as an action. For example, if we have a custom listener as below:
```scala
package org.apache.spark.sql
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
class TestQueryExecutionListener extends QueryExecutionListener with Logging {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
logError("Look at me! I'm 'onSuccess'")
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { }
}
```
and set `spark.sql.queryExecutionListeners` to `org.apache.spark.sql.TestQueryExecutionListener`
Other operations in PySpark or Scala side seems fine:
```python
>>> sql("SELECT * FROM range(1)").show()
```
```
18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
+---+
| id|
+---+
| 0|
+---+
```
```scala
scala> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
res1: Array[org.apache.spark.sql.Row] = Array([0])
```
but ..
**Before**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
id
0 0
```
**After**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
id
0 0
```
## How was this patch tested?
I have manually tested as described above and unit test was added.
Author: hyukjinkwon <[email protected]>
Closes #21060 from HyukjinKwon/PR_TOOL_PICK_PR_21007_BRANCH-2.3.
|
Since this is not a bug fix, I plan to revert this PR. WDYT? @HyukjinKwon @BryanCutler |
|
hm I would say it's a bug since the action is not detected which is supposed to call the callback. The test is a bit complicated but the fix is relatively straightforward. |
|
This will introduce the behavior change and it is not a regression. The changes we made in this PR could break the external app. We should not do it in the maintenance release. |
|
I guess the behaviour changes here is that a custom query execution listener now can recognise the action |
|
Users apps should not be blamed in this case. If they want this change, they should upgrade to the newer release. Basically, we should not introduce any external behavior change in the maintenance release if possible. |
|
I am a bit puzzled because I agree that it's discouraged to make a behaviour change to the maintenance release, sure. However, I was thinking it makes sense to backport if the fix is not complicated and looks a bug quite clearly. I think we shouldn't say it's improvement in this case. Were actual apps or test cases broken somewhere? |
|
This is just the basic backport rule we follow for each PR. We should not make an exception for this PR. |
|
I agree that It's better to avoid a behaviour change but this one is a clearly a bug and the fix is straightforward. I am puzzled why this specifically prompted you. I wouldn't revert if there's not specific worry about this patch. |
|
If this can be treated as a bug to backport, we have many behavior change PRs that can be backported. We are building the system software. We have to be more principled. |
|
How about we formally document that in the guide? I have been always putting more importance on practice and I personally think we are fine to make a backport if it's a bug and the fix is straightforward. IMHO, principal is a base but we should put more importance on practice. Even if I take your words, I would then like to make this as an exception since this fixes actual usecases from our customers. |
|
Yup, that should reduce some overhead like this. I would like to listen what you guys think cc @srowen, @vanzin, @felixcheung, @holdenk too. |
|
This certainly looks like a bug fix. I don't know this area well, but I don't see an argument here that the current behavior is correct. Right? When we say we don't back-port behavior changes, we mean "changes in what is meant to be correct behavior". All bug fixes change behavior, but to restore correct behavior. So I don't see an argument against back-porting because it's a behavior change. Of course, sometimes practical concerns override that. If we thought programs were relying on the 'wrong' behavior then we'd have to think twice about correcting it. I don't see that argument being made here, but, I'm not sure? There is evidence the 'wrong' behavior is impacting users though? @gatorsmile I must say I don't understand your position here, can you clarify? So far standard practice here says this is a reasonable backport. What's different here? |
|
We hit the similar issue in #18064. At that time, we did not backport the PR to the previous releases too. Thus, I do not think we should make an exception for this PR just because the customers of @HyukjinKwon hit this issue. If we make an exception, it becomes harder to decide which PRs are qualified for a backport. We need to be very careful when backporting the PR with the behavior changes, especially when this is neither a critical issue nor a regression. Thus, I do not think we should backport this PR. |
The callback works for
That's because the change was big and invasive. I wouldn't backport it too; however, this fix is relatively small.
It's not because my customers but I am saying it fixes an actual usecase and it affects actual users.
I think we usually use committer's judgement when we make an exception. I already have been seeing many backports that actually causes behaviour changes and I did this because it looks being backported in general. This is the reason why we should formally document it if this is actually the rule. What I am less sure is, why this one specifically prompted you. |
I am not saying we shouldn't be careful but affects actual user group and actual scenarios. |
The behavior inconsistency among Python/Scala/R/JAVA does not mean a bug, right?
Too big and invasive is not the reason why we did not backport that PR. We still can backport the minimal changes to the previous releases.
I am not against this specific PR. All the committers need to be really careful when they make a decision to backport a behavior change. If any committer does it, we should jump in and stop the backport. This is what we should do. |
This case specifically I am seeing
Let's open a discussion in the mailing list and see if we can see the agreement. I think this was not the first time we talked about this and think it's better to open a proper discussion and make a decision - so you basically mean any behaviour changes shouldn't be backported? |
To improve the usability, we should change it in the master branch. My point is we should not backport this PR to 2.3 release.
Sure, let me lead the discussion in the dev channel and welcome you to add the inputs there. Next, we should also discuss the rule which PRs can be backported to RC branches when we do the release. In Spark 2.3 release, we backported many PRs that should not be merged to the release candidate branches. |
|
This is not an new feature addition .. this fixes an exiting functionality to work as expected and consistently .. |
|
Fixing API inconsistency should not be treated as a bug fix. Please give me a few days. I need to summarize the Spark 2.3 release and list all the PRs that were backported to the release candidate branches. Thanks! |
|
This is not just about just inconsistency but a bug. The previous behaivour doesn't make sense and I can't imagine the way it breaks external apps in principle. Also, it fixes actual use cases. Sure, no need to rush. |
|
Like what I said above, we need to be very careful when backporting the PR with the behavior changes, especially when this is neither a critical issue nor a regression. Even if this is a bug based on your understanding, we should still not backport such PRs. |
|
I am not saying we shouldn't be careful. I am trying to be careful when I backport. So, your reasons are:
|
|
cc @rdblue and @steveloughran too who I guess should be interested in setting up a backporting policy. |
|
This is one of those great problems in software engineering: no good answer. I think case-by-case is generally the best tactic, with a bias against feature backport, though my track record is a bit mixed. Patches which fix security issues at the expense of compatibility are real problems here: they need to go in even knowing stuff will break —especially when you quietly push it out with an innocuous JIRA title until you actually do the releases. People start complaining that XML entity expansion has has stopped working, REST APIs failing if unauthed, when that is the exact outcome intended, Talk to @templedf for a good policy here |
|
This was a bug fix from my perspective and looked to be low risk. I don't think this changes any behavior for the user, except if you do a |
|
I agree with what @srowen said:
I also think that this is definitely a bug fix and that it is worth backporting to 2.3. @gatorsmile: it's a stretch to say that this isn't a bug fix because it isn't a regression or isn't critical. I would very reasonably expect this behavior, especially because it is the current behavior in Scala. |
|
It looks to me this is a bug fix that can merit backporting, as QueryExecutionListener is also marked as experimental, In this case, I think @gatorsmile worrys one might have written a listener that enumerates the possible function names, and that listener will fail now with a new action name. I feel this is quite unlikely, but I also appreciate @gatorsmile's concern for backward compatibility, and I've certainly been wrong before when our fixes break existing workloads. (On the spectrum of being extremely conservative to extremely liberal w.r.t. backward compatibility, I think I'm in general more on the middle, whereas @gatorsmile probably leans more to the conservative side. There isn't really anything wrong with this, and it's good to have balancing forces in a project.) How about this, @HyukjinKwon -- for the 2.3.x backport, add a config that so it is possible to turn this off in production, if somebody actually has their job failed because of this? It's a small delta from what this PR already does, and that should alleviate the concerns @gatorsmile has. I'd also change the function doc for onSuccess/onFailure to make it clear that we will add new function names in the future, and users shouldn't expect a fixed list of function names. |
|
I am okay if there's a specific reason. I think this is the point - if there's a specific reason, that should be mentioned and explained ahead. Actually, I (and @srowen did as well IIUC) asked this many times, see above. I would have investigated or would have just said that I am okay with reverting. I don't usually get in the way if there's a specific reason. It would be great if we can have more open talks next time.
I am personally fine with reverting or adding a configuration if that's what you guys feel strongly; however, I should say it sounds unusual to have a config to control this behaviour in branch-2.3 alone and it sounds less worth. The case you mention sounds really unlikely and I wonder if that makes sense tho. It's also experimental as you all said. Also, I should note that I have been confused about the backporting policy and the bunch of configurations to control each behaviour. If that's just concerns to be addressed, that's fine but sounds what people must follow so far. If this is true, I feel sure this should be documented and we shouldn't have such overhead next time. I am pretty sure this isn't the first time. |
|
Adding a flag just in 2.3 is, at least, an unusual thing to do. By this logic lots of backports should be flag protected but we don't. Why is this special? I still don't see much argument against this backport. I count about 3-4 committers in favor and 1 against. Let's leave it. |
|
I might not explain it well. Sorry for the misunderstanding. Thank you @rxin for helping me clarify my points. It sounds like many of you think this backport is fine. I am not against this specific PR. We do not need to revert the PR but just improve the documentation. That should be fine, although I still personally prefer to adding the configuration. As what I said in the original PR #21007 that was merged to master, let me point out two points here too.
Before we finalize the backport policy, below is my inputs about the whitelist which we can backport:
Avoid backporting the PRs if it contains
In the OSS community, I believe no committer will be fired just because we merged/introduced a bug, right? If the users application failed due to an upgrade, normally we blame our users or the bug are just accidentally introduced. However, this is not acceptable in my first team. Let me share what I experienced. Just various customer accidents in my related product teams.
If all the above people believes Spark is the best product in Big Data, we need to be more conservative. Our decisions could affect many people. This is not the first time I argued with the other committers/contributors about the PR quality. In one previous PR, I left almost 100 comments just because the documents are not accurate. If my above comments offend anyone, I apologize. Everyone has different understanding about the software development because we have different work experience. The whole community already did a wonderful job compared with the other open source projects. I still believe we can do a better job, right? Let us formalize the backport policy and enforce them in each release. |
|
I do not see a problem with the commit message here. Is that really the issue? it accurately describes what changes. The why has always been documented in discussion, and it is here already. Sometimes the why is documented in comments too; I don't see a particular need for that here, but, if that's the issue, why isn't that what we're talking about? You continue to portray this as a behavior change, and I think you mean "a change in what is considered correct behavior". However all the other comments suggest otherwise; the argument from consistency seems much stronger. Your proposed criteria for backports sort of align with accepted practice, which is to follow semver.org semantics. I think semver is reasonably clear, in general and in this case. I see broad agreement for this backport, and people simply disagree with your interpretation. It is not a failure to understand criteria. Believe me, people here have plenty experience with software, versioning, and the impact of changes. I'd put more faith in the judgment of your peers. Your anecdotes are of a type that's familiar to many people, but, I also fail to see how they're relevant here. You are adopting a 'conservative' position and I think in this case it's out of line with normal practice. I think you should accept that people disagree and move on. |
|
I am fine to accept different opinions for this specific PR. Reverting this PR is not my goal here. This is a public community. It sounds like the commit message clearly delivers what this PR does to you: I am also glad you agree on the backport policy I proposed above. Hopefully, everyone is on the same page for avoiding unnecessary overhead.
I personally thought this PR fits this category. No matter whether the behavior changes are correct or not, we should still not backport it if the issue is neither critical nor a regression. That is what I emphasized in the above argument multiple times. The API inconsistency is not rare between our APIs. We did not backport these PRs. Now, I am fine to backport it because it is an experimental API. Thus, we can say we do not guarantee the backport compatibility. If it were a public API, I would insist my original opinion. I am also glad many community members have a lot of experience with mission critical software development. This can help improve documentation, code quality and test coverage. Development of application/mobile software is completely different from development of system software. We are in the right direction. We need to enforce it with stricter discipline. |
|
|
@steveloughran Agree. We always can make an exception if most need it. Normally, in these cases, we should make it configurable. That means, users can turn them off by changing the SQLConf.
I am trying to summarize what we did in the Spark 2.3 release. It took almost 2 month to release it. Will send the postmortem to the community with some proposal about the backport policy. |
What changes were proposed in this pull request?
This PR proposes to add
collectto a query executor as an action.Seems
collect/collectwith Arrow are not recognised viaQueryExecutionListeneras an action. For example, if we have a custom listener as below:and set
spark.sql.queryExecutionListenerstoorg.apache.spark.sql.TestQueryExecutionListenerOther operations in PySpark or Scala side seems fine:
but ..
Before
After
How was this patch tested?
I have manually tested as described above and unit test was added.