-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27323][CORE][SQL][STREAMING] Use Single-Abstract-Method support in Scala 2.12 to simplify code #24241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| override def apply(key: ContextBarrierId): ContextBarrierState = | ||
| new ContextBarrierState(key, numTasks) | ||
| }) | ||
| states.computeIfAbsent(barrierId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
90% of the changes are like this, converting an anonymous inner class to a SAM expression.
| /** Get an optional value, applying variable substitution. */ | ||
| private[spark] def getWithSubstitution(key: String): Option[String] = { | ||
| getOption(key).map(reader.substitute(_)) | ||
| getOption(key).map(reader.substitute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the files I did change, I cleaned up a few other things in nearby code, like this. Other examples are using .nonEmpty and removing redundant braces, etc
|
Test build #104088 has finished for PR 24241 at commit
|
|
Test build #104100 has finished for PR 24241 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All transformations look nice. Thanks! If we add the following two cases, it will be complete. Could you add these together in this PR, @srowen ?
KinesisCheckpointerSuite.scala
- when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] {
- override def answer(invocations: InvocationOnMock): Unit = {
- clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
- }
- })
+ when(checkpointerMock.checkpoint(anyString)).thenAnswer((_: InvocationOnMock) =>
+ clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2))SparkSQLCLIDriver.scala
- HiveInterruptUtils.add(new HiveInterruptCallback {
- override def interrupt() {
- // Handle remote execution mode
- if (SparkSQLEnv.sparkContext != null) {
- SparkSQLEnv.sparkContext.cancelAllJobs()
- } else {
- if (transport != null) {
- // Force closing of TCP connection upon session termination
- transport.getSocket.close()
- }
+ HiveInterruptUtils.add(() => {
+ // Handle remote execution mode
+ if (SparkSQLEnv.sparkContext != null) {
+ SparkSQLEnv.sparkContext.cancelAllJobs()
+ } else {
+ if (transport != null) {
+ // Force closing of TCP connection upon session termination
+ transport.getSocket.close()|
Test build #104120 has finished for PR 24241 at commit
|
|
Test build #104140 has finished for PR 24241 at commit
|
|
Test build #4673 has finished for PR 24241 at commit
|
|
Looks the test failures are not persistent. |
|
retest this please |
|
Test build #104151 has finished for PR 24241 at commit
|
| // multiple distinct keys might be treated as equal by the ordering. To deal with this, we | ||
| // need to read all keys considered equal by the ordering at once and compare them. | ||
| new Iterator[Iterator[Product2[K, C]]] { | ||
| val it = new Iterator[Iterator[Product2[K, C]]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit (more like a question): why a new val is introduced here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to convert the flatMap(i => i) below to simply flatten. For reasons even I'm not clear about, I had to introduce an intermediate val here then return it.flatten to get it to work. Seems harmless enough as a change but I didn't get the difference in type inference
| runner.runCommandWithRetry(processBuilder, p => (), supervise = superviseRetry) | ||
| } | ||
| }).when(runner).prepareAndRunDriver() | ||
| doAnswer((_: InvocationOnMock) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: In this case would not we prefer to have {, like:
doAnswer { (_: InvocationOnMock) =>
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change it. I don't have a strong preference. I default to not using blocks where not necessary, but for non-trivial blocks it's probably clearer to use them anyway
| throw new IllegalStateException("hostA should be on the blacklist") | ||
| } | ||
| when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer((_: InvocationOnMock) => | ||
| if (blacklist.nodeBlacklist.contains("hostA")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the comment lost on purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, no probably lost by automated refactoring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am kinda glad. Because I started to question whether I would like to read this all... Now I see it has value so, I continue it :)
| throw new IllegalStateException("hostA should be on the blacklist") | ||
| } | ||
| when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer((_: InvocationOnMock) => | ||
| if (blacklist.nodeBlacklist.contains("hostA")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I am quite sure it is deleted on purpose.
| def numReceivers(): Int = { | ||
| receiverInputStreams.size | ||
| } | ||
| def numReceivers(): Int = receiverInputStreams.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fine (I just would like to help for the next reviewer), so the reason of this change:
Replace .size with .length on arrays and strings
Inspection info: This inspection reports array.size and string.size calls. While such calls are legitimate, they require an additional implicit conversion to SeqLike to be made. A common use case would be calling length on arrays and strings may provide significant advantages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we ought to make this kind of change (and use lengthCompare on a similar note where sizes are compared) wherever perf matters. Probably a good habit in general, but, wouldn't change it unless the code is already changing. Here I just took the liberty of adjusting this one, though there's no big reason for it here.
More generally I have a number of large code cleanup changes I want to get into Spark 3, and although I won't change this particular issue wholesale, I do want to get in some code cleanup before a new major version.
|
There are a few more "((" which could be transformed to " { (": But I am fine to keep them as it is. |
|
Test build #4676 has finished for PR 24241 at commit
|
| // Use the reverse order because PriorityQueue dequeues the max | ||
| override def compare(x: Iter, y: Iter): Int = comparator.compare(y.head._1, x.head._1) | ||
| }) | ||
| val heap = new mutable.PriorityQueue[Iter]()( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I am checking for missing comments by grep. It is semi-automatic so I am just sharing you the places I have found, here we lost:
// Use the reverse order because PriorityQueue dequeues the max
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can restore that, it's minor
| val dataSource = | ||
| DataSource( | ||
| sparkSession, | ||
| // In older version(prior to 2.1) of Spark, the table schema can be empty and should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No comment is missing but should we still support this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question. I won't change it here, as maybe there's still a use case for reading stuff written by Spark 2.1 in Spark 2.4, for example.
attilapiros
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions left but after that LGTM.
|
Test build #104168 has finished for PR 24241 at commit
|
|
Test build #104169 has finished for PR 24241 at commit
|
|
Test build #104171 has finished for PR 24241 at commit
|
|
Test build #4677 has finished for PR 24241 at commit
|
|
#24268 is merged now. |
|
Retest this please. |
|
Test build #104185 has finished for PR 24241 at commit
|
|
Retest this please. |
|
Retest this please. (There was a revert in |
|
Test build #104193 has finished for PR 24241 at commit
|
|
This must be either a flaky test or something with the auto-merge with master as I have checked this PR out locally and run: In |
|
Retest this please. |
|
Test build #104197 has finished for PR 24241 at commit
|
|
Test build #104198 has finished for PR 24241 at commit
|
|
@attilapiros . I reverted the offending commit from the master. :) It was a persistent UT failure across all PRs and |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master.
Thank you, @srowen and @attilapiros !
|
Test build #104212 has finished for PR 24241 at commit
|
What changes were proposed in this pull request?
Use Single Abstract Method syntax where possible (and minor related cleanup). Comments below. No logic should change here.
How was this patch tested?
Existing tests.