Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch fixes the bug regarding NPE in SQLConf.get, which is only possible when SparkContext._dagScheduler is null due to stopping SparkContext. The logic doesn't seem to consider active SparkContext could be in progress of stopping.

Note that it can't be encountered easily as SparkContext.stop() blocks the main thread, but there're many cases which SQLConf.get is accessed concurrently while SparkContext.stop() is executing - users run another threads, or listener is accessing SQLConf.get after dagScheduler is set to null (this is the case what I encountered.)

Why are the changes needed?

The bug brings NPE.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Previous patch #25753 was tested with new UT, and due to disruption with other tests in concurrent test run, the test is excluded in this patch.

@HeartSaVioR
Copy link
Contributor Author

cc. @dongjoon-hyun @srowen @vanzin @HyukjinKwon who reviewed previous PR.

This is revised version of #25753 - please refer #25753 (comment) to see why the patch were reverted and new patch doesn't contain test.

@SparkQA
Copy link

SparkQA commented Sep 14, 2019

Test build #110591 has finished for PR 25790 at commit db24787.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you, @HeartSaVioR .

BTW, if there is a person to investigate the previous failure, I believe it is you. Could you take a look at that?

@HeartSaVioR
Copy link
Contributor Author

https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-jdk-11-ubuntu-testing/1554/consoleFull

There's leakage on threads after running previous test:

- SPARK-29046: SQLConf.get shouldn't throw NPE when active SparkContext is stopping
19:20:51.356 WARN org.apache.spark.sql.internal.SQLConfSuite: 

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.internal.SQLConfSuite, thread names: executor-heartbeater, Timer-684, dag-scheduler-event-loop, Timer-685 =====

and it affects other suites.

- determining the number of reducers: complex query 2
19:21:03.377 WARN org.apache.spark.rpc.netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@56f4464f[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5c0c2d3f[Wrapped task = org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1@52cbd8ce]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@13f6d2f5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
19:21:03.378 WARN org.apache.spark.executor.Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:228)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:911)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:205)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1960)
	at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
	at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:137)
	at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:240)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:546)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:550)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102)
	... 11 more

and

- Union two datasets with different pre-shuffle partition number
19:21:06.093 WARN org.apache.spark.sql.execution.ReduceNumShufflePartitionsSuite: 

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.ReduceNumShufflePartitionsSuite, thread names: QueryStageCreator-21, QueryStageCreator-26, QueryStageCreator-19, QueryStageCreator-25, QueryStageCreator-22, QueryStageCreator-29, QueryStageCreator-18, QueryStageCreator-17, QueryStageCreator-23, QueryStageCreator-24, QueryStageCreator-31, QueryStageCreator-28, QueryStageCreator-27, QueryStageCreator-16, QueryStageCreator-30, QueryStageCreator-20 =====

19:21:06.118 WARN org.apache.spark.sql.SparkSession: An existing Spark session exists as the active or default session.
This probably means another suite leaked it. Attempting to stop it before continuing.
This existing Spark session was created at:

scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
org.scalatest.Transformer.apply(Transformer.scala:22)
org.scalatest.Transformer.apply(Transformer.scala:20)
org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
scala.collection.immutable.List.foreach(List.scala:392)

That was actually due to not stop dagScheduler before assigning null. But even I fixed it, it didn't resolve other side-effects coming from Spark's ground rule - Spark doesn't support multiple SparkContext instances. (Other things were broken.)

I just learned more for this time, it doesn't work even one is being used actively and other one is sitting down to wait. Never have two SparkContext instances in JVM.

@HeartSaVioR
Copy link
Contributor Author

Another possible code of test which don't swap SparkContext:

 test("SPARK-29046: SQLConf.get shouldn't throw NPE when active SparkContext is stopping") {
    // Logically, there's only one case SQLConf.get throws NPE: there's active SparkContext,
    // but SparkContext is stopping - especially it sets dagScheduler as null.
    val dagScheduler = sparkContext.dagScheduler
    Utils.tryWithSafeFinally {
      sparkContext.dagScheduler = null
      SQLConf.get
    } {
      sparkContext.dagScheduler = dagScheduler
    }
}

but I wouldn't guarantee the test can be run concurrently with other tests - so if possible I'll not update the patch. I'd even suspect how tests have been able to run concurrently where we make a change with static val (I meant object), but I don't have unlimited bandwidth and there're lots of remaining works on me (still have many ideas on SS area, and I've done only first part of SPARK-28594) so I'd like to stop putting efforts on this and revisit later when my backlog is empty.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Sep 14, 2019

Ya. All of us learned more at this. Thank you for spending your time on this.
Also, I really appreciate your efforts. Especially, your recent flaky test fixes help 3.0.0 a lot.

@HeartSaVioR
Copy link
Contributor Author

Thanks for the kind words :) Really appreciated your consistent efforts on INFRA/background things like stabilizing builds.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29046] Fix NPE in SQLConf.get when active SparkContext is stopping [SPARK-29046][SQL] Fix NPE in SQLConf.get when active SparkContext is stopping Sep 15, 2019
Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to be clear we think this is a correct change (the test in isolation worked) but it's hard to test without creating problems for other tests? OK, this change seems fine still by itself.

@dongjoon-hyun
Copy link
Member

Ya. I believe so, too.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to master back.
Thank you, @HeartSaVioR and @srowen .

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging again. :)

@HeartSaVioR HeartSaVioR deleted the SPARK-29046-v2 branch September 15, 2019 23:43
HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Sep 16, 2019
… stopping

### What changes were proposed in this pull request?

This patch fixes the bug regarding NPE in SQLConf.get, which is only possible when SparkContext._dagScheduler is null due to stopping SparkContext. The logic doesn't seem to consider active SparkContext could be in progress of stopping.

Note that it can't be encountered easily as SparkContext.stop() blocks the main thread, but there're many cases which SQLConf.get is accessed concurrently while SparkContext.stop() is executing - users run another threads, or listener is accessing SQLConf.get after dagScheduler is set to null (this is the case what I encountered.)

### Why are the changes needed?

The bug brings NPE.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Previous patch apache#25753 was tested with new UT, and due to disruption with other tests in concurrent test run, the test is excluded in this patch.

Closes apache#25790 from HeartSaVioR/SPARK-29046-v2.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 16, 2019

FYI: I've raised the patch for #25798 against branch-2.4 as the branch I've originally found the issue was 2.4.x. (Sorry I missed to add these versions in affected versions.) Please let me know if we wouldn't port back like this case - I'll close it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants